public void configure(FlowSpecifier flowSpecifier) {
// Set up some meta data
flowSpecifier.name("TwitterScanner");
flowSpecifier.email("dev@continuuity.com");
flowSpecifier.application("Twitter Demo");
// add the sorted counters data set
flowSpecifier.dataset(topHashTags);
flowSpecifier.dataset(topUsers);
flowSpecifier.dataset(wordCounts);
flowSpecifier.dataset(hashTagWordAssocs);
// Now wire up the Flow for real
flowSpecifier.flowlet("StreamReader", TwitterGenerator.class, 1);
flowSpecifier.flowlet("Processor", TwitterProcessor.class, 1);
flowSpecifier.flowlet("WordIndexer", TwitterWordIndexer.class, 1);
flowSpecifier.flowlet("HashTagIndexer", TwitterHashTagIndexer.class, 1);
// Connect to the next Flowlet
flowSpecifier.connection("StreamReader", "Processor");
flowSpecifier.connection("Processor", "WordIndexer");
flowSpecifier.connection("Processor", "HashTagIndexer");
}
}
@Override
public FlowSpecification configure() {
return FlowSpecification.Builder.with()
.setName("TwitterScanner")
.setDescription("Twitter Demo")
.withFlowlets()
.add("StreamReader", new TwitterGenerator(), 1)
.add("Processors", new TwitterProcessor(), 1)
.add("WordIndexer", new TwitterWordIndexer(), 1)
.add("HashTagIndexer", new TwitterHashTagIndexer(), 1)
.connect()
.from("StreamReader").to("Processor")
.from("Processor").to("WordIndexer")
.from("Processor").to("HashTagIndexer")
.build();
}
}